定期装载
- 初始装载只在开始数据仓库使用前执行一次,然而,必须要按时调度定期执行装载源数据的过程。与初始装载不同,定期装载一般都是增量的,需要捕获并且记录数据的变化历史。本节说明执行定期装载的步骤,包括识别源数据与装载类型、使用HiveQL开发和测试定期装载过程。
- 定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。
源数据 | RDS | 数据仓库 | 抽取模式 | 维度历史装载类型 |
---|---|---|---|---|
customer | customer | customer_dim | 整体、拉取 | address列上SCD2name列上SCD1 |
product | product | product_dim | 整体、拉取 | SCD2 |
sales_order | sales_order | order_dim | CDC(每天)、拉取 | 唯一订单号 |
sales_order_fact | CDC(每天)、拉取 | n/a | ||
n/a | n/a | date_dim | n/a | 预装载 |
前期准备
order_dim
维度表和sales_order_fact
事实表使用基于时间戳的CDC装载模式。为此在rds库中建立一个名cdc_time
的时间戳表。
USE rds;
DROP TABLE IF EXISTS cdc_time ;
CREATE TABLE cdc_time
(
last_load date,
current_load date
);
SET hivevar:last_load = DATE_ADD(CURRENT_DATE(),-1);
INSERT OVERWRITE TABLE cdc_time SELECT {% math_inline %}{hivevar:last_load}, {% endmath_inline %}{hivevar:last_load} ;
编写装载脚本
使用下面的regular_etl.sh脚本完成定期装载过程。
#!/bin/bash
# 整体拉取customer、product表数据
sqoop import --connect jdbc:mysql://localhost:3306/source?useSSL=false --username root --password scott5183 --table customer --hive-import --hive-table rds.customer --hive-overwrite
sqoop import --connect jdbc:mysql://localhost:3306/source?useSSL=false --username root --password scott5183 --table product --hive-import --hive-table rds.product --hive-overwrite
# 执行增量导入
sqoop job --exec myjob_incremental_import
# 调用 regular_etl.sql 文件执行定期装载
hive -f regular_etl.sql --hiveconf hive.mapred.mode=nonstrict
设置数据处理时间窗口
在脚本中设置三个变量,分别赋予起始时间点、终止时间点、最大时间点的值,并且将时间戳表rds.cdc_time的last_load和current_load字段分别设置为起始时间点和终止时间点。这些变量会在后面的脚本中多次引用。
-- 设置SCD的生效时间和过期时间
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD({% math_inline %}{hivevar:cur_date},-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
-- 设置CDC的上限时间
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, {% endmath_inline %}{hivevar:cur_date} FROM rds.cdc_time;
装载客户维度表
客户维度表的customer_street_addresses字段值变化时采用SCD2,需要新增版本,customer_name字段值变化时采用SCD1,直接覆盖更新。
-- 装载customer维度
-- 设置已删除记录和customer_street_addresses列上SCD2的过期
UPDATE customer_dim
SET expiry_date = {% math_inline %}{hivevar:pre_date}
WHERE customer_dim.customer_sk IN
(SELECT a.customer_sk
FROM (SELECT customer_sk,customer_number,customer_street_address
FROM customer_dim WHERE expiry_date = {% endmath_inline %}{hivevar:max_date}) a LEFT JOIN
rds.customer b ON a.customer_number = b.customer_number
WHERE b.customer_number IS NULL OR a.customer_street_address <> b.customer_street_address);
-- 处理customer_street_addresses列上SCD2的新增行
INSERT INTO customer_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.customer_number customer_number,
t2.customer_name customer_name,
t2.customer_street_address customer_street_address,
t2.customer_zip_code,
t2.customer_city,
t2.customer_state,
t1.version + 1 version,
{% math_inline %}{hivevar:pre_date} effective_date,
{% endmath_inline %}{hivevar:max_date} expiry_date
FROM customer_dim t1
INNER JOIN rds.customer t2
ON t1.customer_number = t2.customer_number
AND t1.expiry_date = {% math_inline %}{hivevar:pre_date}
LEFT JOIN customer_dim t3
ON t1.customer_number = t3.customer_number
AND t3.expiry_date = {% endmath_inline %}{hivevar:max_date}
WHERE t1.customer_street_address <> t2.customer_street_address AND t3.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
装载产品纬度表
- 装载product维度
-- 设置已删除记录和product_name、product_category列上SCD2的过期
UPDATE product_dim
SET expiry_date = {% math_inline %}{hivevar:pre_date}
WHERE product_dim.product_sk IN
(SELECT a.product_sk
FROM (SELECT product_sk,product_code,product_name,product_category
FROM product_dim WHERE expiry_date = {% endmath_inline %}{hivevar:max_date}) a LEFT JOIN
rds.product b ON a.product_code = b.product_code
WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
装载订单纬度表
-- 装载order维度
INSERT INTO order_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,
t1.order_number,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
order_number order_number,
1 version,
order_date effective_date,
'2200-01-01' expiry_date
FROM rds.sales_order, rds.cdc_time
WHERE entry_date >= last_load AND entry_date < current_load ) t1
CROSS JOIN
(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;
装载销售事实表
-- 装载销售订单事实表
INSERT INTO sales_order_fact
SELECT
order_sk,
customer_sk,
product_sk,
date_sk,
order_amount
FROM
rds.sales_order a,
order_dim b,
customer_dim c,
product_dim d,
date_dim e,
rds.cdc_time f
WHERE
a.order_number = b.order_number
AND a.customer_number = c.customer_number
AND a.order_date >= c.effective_date
AND a.order_date < c.expiry_date
AND a.product_code = d.product_code
AND a.order_date >= d.effective_date
AND a.order_date < d.expiry_date
AND to_date(a.order_date) = e.date
AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ;
更新时间窗口
-- 更新时间戳表的last_load字段
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
总脚本
-- 设置变量以支持事务
set hive.support.concurrency=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on=true;
set hive.compactor.worker.threads=1;
USE dw;
-- 设置SCD的生效时间和过期时间
SET hivevar:cur_date = CURRENT_DATE();
SET hivevar:pre_date = DATE_ADD({% math_inline %}{hivevar:cur_date},-1);
SET hivevar:max_date = CAST('2200-01-01' AS DATE);
-- 设置CDC的上限时间
INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, {% endmath_inline %}{hivevar:cur_date} FROM rds.cdc_time;
-- 装载customer维度
-- 设置已删除记录和customer_street_addresses列上SCD2的过期
UPDATE customer_dim
SET expiry_date = {% math_inline %}{hivevar:pre_date}
WHERE customer_dim.customer_sk IN
(SELECT a.customer_sk
FROM (SELECT customer_sk,customer_number,customer_street_address
FROM customer_dim WHERE expiry_date = {% endmath_inline %}{hivevar:max_date}) a LEFT JOIN
rds.customer b ON a.customer_number = b.customer_number
WHERE b.customer_number IS NULL OR a.customer_street_address <> b.customer_street_address);
-- 处理customer_street_addresses列上SCD2的新增行
INSERT INTO customer_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.customer_number customer_number,
t2.customer_name customer_name,
t2.customer_street_address customer_street_address,
t2.customer_zip_code,
t2.customer_city,
t2.customer_state,
t1.version + 1 version,
{% math_inline %}{hivevar:pre_date} effective_date,
{% endmath_inline %}{hivevar:max_date} expiry_date
FROM customer_dim t1
INNER JOIN rds.customer t2
ON t1.customer_number = t2.customer_number
AND t1.expiry_date = {% math_inline %}{hivevar:pre_date}
LEFT JOIN customer_dim t3
ON t1.customer_number = t3.customer_number
AND t3.expiry_date = {% endmath_inline %}{hivevar:max_date}
WHERE t1.customer_street_address <> t2.customer_street_address AND t3.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
-- 处理customer_name列上的SCD1
-- 因为hive里update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update,为简单起见也不考虑并发问题(数据仓库应用的并发操作基本都是只读的,很少并发写,所以并发导致的问题并不像OLTP那样严重)。
-- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录
DROP TABLE IF EXISTS tmp;
CREATE TABLE tmp AS
SELECT
a.customer_sk,
a.customer_number,
b.customer_name,
a.customer_street_address,
a.customer_zip_code,
a.customer_city,
a.customer_state,
a.version,
a.effective_date,
a.expiry_date
FROM customer_dim a, rds.customer b
WHERE a.customer_number = b.customer_number AND (a.customer_name <> b.customer_name);
DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);
INSERT INTO customer_dim SELECT * FROM tmp;
-- 处理新增的customer记录
INSERT INTO customer_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
1,
{% math_inline %}{hivevar:pre_date},
{% endmath_inline %}{hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number
WHERE t2.customer_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;
-- 装载product维度
-- 设置已删除记录和product_name、product_category列上SCD2的过期
UPDATE product_dim
SET expiry_date = {% math_inline %}{hivevar:pre_date}
WHERE product_dim.product_sk IN
(SELECT a.product_sk
FROM (SELECT product_sk,product_code,product_name,product_category
FROM product_dim WHERE expiry_date = {% endmath_inline %}{hivevar:max_date}) a LEFT JOIN
rds.product b ON a.product_code = b.product_code
WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));
-- 处理product_name、product_category列上SCD2的新增行
INSERT INTO product_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
t2.product_code product_code,
t2.product_name product_name,
t2.product_category product_category,
t1.version + 1 version,
{% math_inline %}{hivevar:pre_date} effective_date,
{% endmath_inline %}{hivevar:max_date} expiry_date
FROM product_dim t1
INNER JOIN rds.product t2
ON t1.product_code = t2.product_code
AND t1.expiry_date = {% math_inline %}{hivevar:pre_date}
LEFT JOIN product_dim t3
ON t1.product_code = t3.product_code
AND t3.expiry_date = {% endmath_inline %}{hivevar:max_date}
WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
-- 处理新增的product记录
INSERT INTO product_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
1,
{% math_inline %}{hivevar:pre_date},
{% endmath_inline %}{hivevar:max_date}
FROM
(
SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code
WHERE t2.product_sk IS NULL) t1
CROSS JOIN
(SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;
-- 装载order维度
INSERT INTO order_dim
SELECT
ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,
t1.order_number,
t1.version,
t1.effective_date,
t1.expiry_date
FROM
(
SELECT
order_number order_number,
1 version,
order_date effective_date,
'2200-01-01' expiry_date
FROM rds.sales_order, rds.cdc_time
WHERE entry_date >= last_load AND entry_date < current_load ) t1
CROSS JOIN
(SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;
-- 装载销售订单事实表
INSERT INTO sales_order_fact
SELECT
order_sk,
customer_sk,
product_sk,
date_sk,
order_amount
FROM
rds.sales_order a,
order_dim b,
customer_dim c,
product_dim d,
date_dim e,
rds.cdc_time f
WHERE
a.order_number = b.order_number
AND a.customer_number = c.customer_number
AND a.order_date >= c.effective_date
AND a.order_date < c.expiry_date
AND a.product_code = d.product_code
AND a.order_date >= d.effective_date
AND a.order_date < d.expiry_date
AND to_date(a.order_date) = e.`date`
AND a.entry_date >= f.last_load AND a.entry_date < f.current_load ;
-- 更新时间戳表的last_load字段
INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
测试执行脚本
执行MySQL脚本更新源数据
USE source;
/***
客户数据的改变如下:
客户6的街道号改为7777 Ritter Rd。(原来是7070 Ritter Rd)
客户7的姓名改为Distinguished Agencies。(原来是Distinguished Partners)
新增第八个客户。
***/
UPDATE customer SET customer_street_address = '7777 Ritter Rd.' WHERE customer_number = 6 ;
UPDATE customer SET customer_name = 'Distinguished Agencies' WHERE customer_number = 7 ;
INSERT INTO customer
(customer_name, customer_street_address, customer_zip_code, customer_city, customer_state)
VALUES
('Subsidiaries', '10000 Wetline Blvd.', 17055, 'Pittsburgh', 'PA') ;
/***
产品数据的改变如下:
产品3的名称改为Flat Panel。(原来是LCD Panel)
新增第四个产品。
***/
UPDATE product SET product_name = 'Flat Panel' WHERE product_code = 3 ;
INSERT INTO product
(product_name, product_category)
VALUES
('Keyboard', 'Peripheral') ;
/***
新增订单日期为2016年7月4日的16条订单。
***/
SET @start_date := unix_timestamp('2016-07-04');
SET @end_date := unix_timestamp('2016-07-05');
DROP TABLE IF EXISTS temp_sales_order_data;
CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (101, 1, 1, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (102, 2, 2, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (103, 3, 3, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (104, 4, 4, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (105, 5, 2, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (106, 6, 2, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (107, 7, 3, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (108, 8, 4, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (109, 1, 1, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (110, 2, 2, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (111, 3, 3, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (112, 4, 4, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (113, 5, 1, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (114, 6, 2, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (115, 7, 3, @order_date, @order_date, @amount);
SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
SET @amount := floor(1000 + rand() * 9000);
INSERT INTO temp_sales_order_data VALUES (116, 8, 4, @order_date, @order_date, @amount);
INSERT INTO sales_order
SELECT NULL,customer_number,product_code,order_date,entry_date,order_amount FROM temp_sales_order_data ORDER BY order_date;
COMMIT ;
在MySQ中执行:
source /home/scott/Documents/update.sql;
查看
use dw;
select * from customer_dim;
select * from product_dim;
select * from order_dim;
select * from sales_order_fact;
select * from date_dim;
执行regular_etl.sh脚本进行定期装载
./regular_etl.sh